Netty主动推送数据到客户端实现方式 您所在的位置:网站首页 netty 服务端主动发送消息 Netty主动推送数据到客户端实现方式

Netty主动推送数据到客户端实现方式

#Netty主动推送数据到客户端实现方式| 来源: 网络整理| 查看: 265

需求场景:

随着物联网的发展,随之出现了各种传感器监测数据的实时发送,需要和netty服务器通讯,netty和传感器之间需要保持长连接(换句话说,netty和gateway之间都会主动给对方发送消息)

碰到的问题: netty作为服务器端如何主动的向传感器发送消息,我尝试当每个传感器连接到netty(TCP/IP)时使用一个map把该channelSocket的id和该channelSocket绑定在一起。

先定义一个ConcurrentHashMap,用来保存当前的连接。

import io.netty.channel.Channel; import java.util.concurrent.ConcurrentHashMap; /** * Created by zhou */ public class ChannelMap { public static int channelNum=0; private static ConcurrentHashMap channelHashMap=null;//concurrentHashmap以解决多线程冲突 public static ConcurrentHashMap getChannelHashMap() { return channelHashMap; } public static Channel getChannelByName(String name){ if(channelHashMap==null||channelHashMap.isEmpty()){ return null; } return channelHashMap.get(name); } public static void addChannel(String name,Channel channel){ if(channelHashMap==null){ channelHashMap=new ConcurrentHashMap(10); } channelHashMap.put(name,channel); channelNum++; } public static int removeChannelByName(String name){ if(channelHashMap.containsKey(name)){ channelHashMap.remove(name); return 0; }else{ return 1; } } } 在服务器端EchoServerHandler中的ChannelRead中保存当前的连接 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 获取的业务数据 ByteBuf in = (ByteBuf) msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept[" + request + "]"); // 保存当前连接 ChannelMap.addChannel(uuid,ctx.channel()); } 我在服务器端尝试每间隔一段时间获取这个ConcurrentHashMap如果里面已经有绑定的channelSocket,就使用write方法向客户端发送消息 @GetMapping("/configFrame") public Result configFrame(@RequestParam(name = "sim") String sim) { // 16进制 指令 String receiveStr = "..."; ConcurrentHashMap channelHashMap = ChannelMap.getChannelHashMap(); Channel channel = channelHashMap.get(sim); // 判断是否活跃 if(channel==null || !channel.isActive()){ ChannelMap.getChannelHashMap().remove(sim); return Result.error("连接已经中断"); } // 指令发送 ByteBuf bufff = Unpooled.buffer(); // 根据具体业务传输数据 bufff.writeBytes(receiveStr); channel.writeAndFlush(bufff).addListener((ChannelFutureListener) future -> { StringBuilder sb = new StringBuilder(); if(!StringUtils.isEmpty(sim)){ sb.append("【").append(sim).append("】"); } if (future.isSuccess()) { System.out.println(sb.toString()+"回写成功"+receiveStr); } else { System.out.println(sb.toString()+"回写失败"+receiveStr); } }); return Result.ok(); } 最好还是再写一个定时任务,监测map中的连接是否中断 import io.netty.channel.Channel; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import java.util.Iterator; import java.util.Map; @Configuration // 1.主要用于标记配置类,兼备Component的效果。 @EnableScheduling // 2.开启定时任务 public class ChannelScheduleTask { // 3.定时删除不活跃的连接 @Scheduled(cron = "0/5 * * * * ?") private void configureTasks() { if(ChannelMap.getChannelHashMap()!=null && ChannelMap.getChannelHashMap().size()>0){ Iterator iterator = ChannelMap.getChannelHashMap().entrySet().iterator(); while (iterator.hasNext()) { Map.Entry next = iterator.next(); String key = next.getKey(); Channel channel = next.getValue(); if(!channel.isActive()){ ChannelMap.getChannelHashMap().remove(key); } } } } }


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有